package com.faner4cloud.yun.queue.delay;

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RBlockingQueue;
import org.redisson.api.RDelayedQueue;
import org.redisson.api.RedissonClient;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.Date;

/**
 * 延迟阻塞队列
 *
 * @author faner
 * @since 2022-04-14
 */
@Component
@Slf4j
public class RedisDelayQueueManagerImpl<V> implements RedisDelayQueueManager<V> {

	@Autowired
	private RedissonClient defaultRedissonClient;

	/**
	 * 添加到一个延迟队列中
	 *
	 * @param delayEvent {@link DelayEvent}
	 */
	@Override
	public void put(DelayEvent delayEvent) {
		log.info("#RedisDelayQueueManagerImpl#塞入队列信息-名称 :{}, 值:{}, 过期时间:{}, 时间单位:{}", delayEvent.getDelayTypeEnum().getDesc(), delayEvent.getValue(), delayEvent.getDelayTime(), delayEvent.getTimeUnit());
		try {
			RBlockingQueue<String> blockingQueue = defaultRedissonClient.getBlockingQueue(delayEvent.getDelayTypeEnum().getCode());
			RDelayedQueue<String> delayedQueue = defaultRedissonClient.getDelayedQueue(blockingQueue);
			delayedQueue.offer(JSONUtil.toJsonStr(delayEvent), delayEvent.getDelayTime(), delayEvent.getTimeUnit());
		} catch (Exception e) {
			log.error("#RedisDelayQueueManagerImpl#塞入队列信息失败", e.getMessage());
		}
	}

	/**
	 * 从延迟队列中弹出过期的value
	 *
	 * @param delayTypeEnum {@link DelayTypeEnum}
	 * @return 拉取到的值
	 */
	@Override
	public String popup(DelayTypeEnum delayTypeEnum) {
		String value = null;
		try {
			RBlockingQueue<String> blockingQueue = defaultRedissonClient.getBlockingQueue(delayTypeEnum.getCode());
			value = blockingQueue.take();
		} catch (InterruptedException e) {
			log.error("#RedisDelayQueueManagerImpl#拉取队列信息失败", e.getMessage());
			Thread.currentThread().interrupt();
		}
		log.info("#RedisDelayQueueManagerImpl#拉取队列信息-名称:{}, 值:{}, 拉取到时间: {}", delayTypeEnum.getDesc(), value, new Date());
		return value;
	}
}

